package com.lzp.java.concurrent.threadpool;

import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 可暂停的线程池（继承于普通线程池）
 *
 * 1. 继承ThreadPoolExecutor创建线程池
 * 2. 在任务前后使用钩子方法，协助实现
 * 3. 利用Condition对象
 * --------------
 * 特别注意的是，isPaused是布尔变量，对其赋值时原子操作，针对于该变量的原子操作可以只通过volatile保证可见性就可以了
 *
 * @author lzp
 * @date 2020/03/17
 */
public class PauseableThreadPool extends ThreadPoolExecutor {
    /**
     * 显式锁，上锁保证标志位安全
     */
    private final ReentrantLock lock = new ReentrantLock();

    /**
     * Condition由lock引申出来
     */
    private Condition unpaused = lock.newCondition();

    /**
     * 暂停标记位
     */
    private boolean isPaused;

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    /**
     * 每个任务执行前执行，识别标记位
     *
     * @param t
     * @param r
     */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        lock.lock();
        try {
            while (isPaused) {
                unpaused.await(); // 阻塞挂起，释放锁
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 钩子方法--暂停线程池
     */
    private void pause() {
        lock.lock();
        try {
            isPaused = true;
        } finally {
            lock.unlock();
        }
    }

    private void resume() {
        lock.lock();
        try {
            isPaused = false;
            unpaused.signalAll(); // 通知唤醒
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        PauseableThreadPool pool = new PauseableThreadPool(10, 20, 100, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
        Runnable task = () -> {
            System.out.println("任务被执行" + Thread.currentThread().getName());
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        for (int i = 0; i < 10000; i++) {
            pool.execute(task);
        }
        TimeUnit.SECONDS.sleep(1);
        pool.pause();
        System.out.println("触发线程池暂停操作");
        TimeUnit.SECONDS.sleep(3);
        pool.resume();
        System.out.println("恢复线程池执行");
    }
}
